Support ML async job cancellation, fail jobs on redis errors#1162
Support ML async job cancellation, fail jobs on redis errors#1162mihow merged 45 commits intoRolnickLab:mainfrom
Conversation
* fix: prevent NATS connection flooding and stale job task fetching - Add connect_timeout=5, allow_reconnect=False to NATS connections to prevent leaked reconnection loops from blocking Django's event loop - Guard /tasks endpoint against terminal-status jobs (return empty tasks instead of attempting NATS reserve) - IncompleteJobFilter now excludes jobs by top-level status in addition to progress JSON stages - Add stale worker cleanup to integration test script Found during PSv2 integration testing where stale ADC workers with default DataLoader parallelism overwhelmed the single uvicorn worker thread by flooding /tasks with concurrent NATS reserve requests. Co-Authored-By: Claude <[email protected]> * docs: PSv2 integration test session notes and NATS flooding findings Session notes from 2026-02-16 integration test including root cause analysis of stale worker task competition and NATS connection issues. Findings doc tracks applied fixes and remaining TODOs with priorities. Co-Authored-By: Claude <[email protected]> * docs: update session notes with successful test run #3 PSv2 integration test passed end-to-end (job 1380, 20/20 images). Identified ack_wait=300s as cause of ~5min idle time when GPU processes race for NATS tasks. Co-Authored-By: Claude <[email protected]> * fix: batch NATS task fetch to prevent HTTP timeouts Replace N×1 reserve_task() calls with single reserve_tasks() batch fetch. The previous implementation created a new pull subscription per message (320 NATS round trips for batch=64), causing the /tasks endpoint to exceed HTTP client timeouts. The new approach uses one psub.fetch() call for the entire batch. Co-Authored-By: Claude <[email protected]> * docs: add next session prompt * feat: add pipeline__slug__in filter for multi-pipeline job queries Workers that handle multiple pipelines can now fetch jobs for all of them in a single request: ?pipeline__slug__in=slug1,slug2 Co-Authored-By: Claude <[email protected]> * chore: remove local-only docs and scripts from branch These files are session notes, planning docs, and test scripts that should stay local rather than be part of the PR. Co-Authored-By: Claude <[email protected]> * feat: set job dispatch_mode at creation time based on project feature flags ML jobs with a pipeline now get dispatch_mode set during setup() instead of waiting until run() is called by the Celery worker. This lets the UI show the correct mode immediately after job creation. Co-Authored-By: Claude <[email protected]> * fix: add timeouts to all JetStream operations and restore reconnect policy Add NATS_JETSTREAM_TIMEOUT (10s) to all JetStream metadata operations via asyncio.wait_for() so a hung NATS connection fails fast instead of blocking the caller's thread indefinitely. Also restore the intended reconnect policy (2 attempts, 1s wait) that was lost in a prior force push. Co-Authored-By: Claude <[email protected]> * fix: propagate NATS timeouts as 503 instead of swallowing them asyncio.TimeoutError from _ensure_stream() and _ensure_consumer() was caught by the broad `except Exception` in reserve_tasks(), silently returning [] and making NATS outages indistinguishable from empty queues. Workers would then poll immediately, recreating the flooding problem. - Add explicit `except asyncio.TimeoutError: raise` in reserve_tasks() - Catch TimeoutError and OSError in the /tasks view, return 503 - Restore allow_reconnect=False (fail-fast on connection issues) - Add return type annotation to get_connection() Co-Authored-By: Claude <[email protected]> * fix: address review comments (log level, fetch timeout, docstring) - Downgrade reserve_tasks log to DEBUG when zero tasks reserved (avoid log spam from frequent polling) - Pass timeout=0.5 from /tasks endpoint to avoid blocking the worker for 5s on empty queues - Fix docstring examples using string 'job123' for int-typed job_id Co-Authored-By: Claude <[email protected]> * fix: catch nats.errors.Error in /tasks endpoint for proper 503 responses NoServersError, ConnectionClosedError, and other NATS exceptions inherit from nats.errors.Error (not OSError), so they escaped the handler and returned 500 instead of 503. Co-Authored-By: Claude <[email protected]> --------- Co-authored-by: Claude <[email protected]>
…olnickLab#1142) * feat: configurable NATS tuning and gunicorn worker management Rebase onto main after RolnickLab#1135 merge. Keep only the additions unique to this branch: - Make TASK_TTR configurable via NATS_TASK_TTR Django setting (default 30s) - Make max_ack_pending configurable via NATS_MAX_ACK_PENDING setting (default 100) - Local dev: switch to gunicorn+UvicornWorker by default for production parity, with USE_UVICORN=1 escape hatch for raw uvicorn - Production: auto-detect WEB_CONCURRENCY from CPU cores (capped at 8) when not explicitly set in the environment Co-Authored-By: Claude <[email protected]> * fix: address PR review comments - Fix max_ack_pending falsy-zero guard (use `is not None` instead of `or`) - Update TaskQueueManager docstring with Args section - Simplify production WEB_CONCURRENCY fallback (just use nproc) Co-Authored-By: Claude <[email protected]> --------- Co-authored-by: Michael Bunsen <[email protected]> Co-authored-by: Claude <[email protected]>
* fix: include pipeline_slug in MinimalJobSerializer (ids_only response) The ADC worker fetches jobs with ids_only=1 and expects pipeline_slug in the response to know which pipeline to run. Without it, Pydantic validation fails and the worker skips the job. Co-Authored-By: Claude <[email protected]> * Update ami/jobs/serializers.py Co-authored-by: Copilot <[email protected]> --------- Co-authored-by: Claude <[email protected]> Co-authored-by: Copilot <[email protected]>
JobState(str, OrderedEnum) was using str's lexicographic __gt__ instead of OrderedEnum's definition-order __gt__, because str comes first in the MRO. This caused max(FAILURE, SUCCESS) to return SUCCESS, silently discarding failure state in concurrent job progress updates. Fix: __init_subclass__ injects comparison methods directly onto each subclass so they take MRO priority over data-type mixins. Also preserve FAILURE status through the progress ternary when progress < 1.0, so early failure detection isn't overwritten. Co-Authored-By: Claude <[email protected]>
The NATS message is ACK'd at line 145, before update_state() and _update_job_progress(). If either of those raises, the except block was logging "NATS will redeliver" when it won't. Co-Authored-By: Claude <[email protected]>
… carlosg/redisatomic
When a job is canceled, NATS/Redis cleanup runs before in-flight results finish processing. The resulting "Redis state missing" message is expected, not an error. Co-Authored-By: Claude <[email protected]>
Covers all monitoring points for NATS async jobs: Django ORM, REST API, tasks endpoint, NATS consumer state, Redis counters, Docker logs, and AMI worker logs. Linked from CLAUDE.md and the test_ml_job_e2e command. Co-Authored-By: Claude <[email protected]>
Tests need to set job status to STARTED since the /tasks endpoint now only serves tasks for jobs in active_states() (STARTED, RETRY). Co-Authored-By: Claude <[email protected]>
✅ Deploy Preview for antenna-ssec ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
✅ Deploy Preview for antenna-preview ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
📝 WalkthroughWalkthroughAdds operational tooling and robustness to async job workflows: a chaos-monkey management command, monitoring docs and help text, stronger job cancellation and cleanup semantics, NATS stream-existence checks, renames/introduces cleanup helpers, small UI and test adjustments, and logging/emit changes to refresh DB state before writing logs. Changes
Sequence DiagramsequenceDiagram
participant Client as Client
participant JobModel as Job Model
participant Celery as Celery/Worker
participant TaskQueue as NATS JetStream
participant Redis as Redis Progress Store
participant Cleanup as Cleanup Logic
Client->>JobModel: cancel(job_id)
JobModel->>JobModel: evaluate dispatch_mode
alt ASYNC_API
JobModel->>Celery: revoke associated task(s)
Celery-->>JobModel: revoke ack
JobModel->>Cleanup: cleanup_async_job_if_needed(job)
Cleanup->>TaskQueue: TaskQueueManager.cleanup_job_resources(job_id)
TaskQueue-->>Cleanup: delete/cleanup streams & consumers
Cleanup->>Redis: AsyncJobStateManager.clean_state(job_id)
Redis-->>Cleanup: state cleared
Cleanup-->>JobModel: cleanup result
JobModel->>JobModel: set status -> REVOKED
else sync dispatch / other
JobModel->>JobModel: set status -> CANCELED
end
JobModel-->>Client: cancellation completed
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This pull request enhances ML async job resilience by adding support for job cancellation, improving error handling when Redis state is lost, and introducing chaos testing utilities. The changes build on PR #1150's atomic Redis operations by ensuring proper cleanup of NATS and Redis resources in various failure and cancellation scenarios.
Changes:
- Refactored async resource cleanup to accept job_id and logger instead of Job instance, improving reliability when the Job object is unavailable
- Introduced
_fail_jobhelper that marks jobs as failed and triggers cleanup when Redis state is missing - Updated job cancellation to always clean up async resources and correctly transition CANCELING → REVOKED for async jobs
- Improved job logging with DB refresh before writes and handler instance updates to reduce lost logs
- Added
chaos_monkeymanagement command for fault injection testing - Created comprehensive monitoring documentation and updated job state checks to use
active_states()
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| ami/jobs/models.py | Added active_states() method, updated cancel logic to cleanup async resources and handle ASYNC_API jobs, improved logger handler to always reference current instance and refresh logs from DB |
| ami/jobs/tasks.py | Added _fail_job helper for failing jobs on Redis errors, renamed cleanup function to cleanup_async_job_if_needed for consistency, updated all cleanup call sites |
| ami/jobs/views.py | Changed tasks endpoint to use active_states() instead of checking final_states, preventing task serving during cancellation |
| ami/jobs/tests.py | Updated test jobs to STARTED status to match new active_states requirement |
| ami/ml/orchestration/jobs.py | Refactored cleanup_async_job_resources signature to accept job_id and logger |
| ami/ml/orchestration/nats_queue.py | Added _stream_exists check, refactored _ensure_stream to avoid unnecessary creation, updated test to return empty list when stream doesn't exist |
| ami/ml/orchestration/tests/test_nats_queue.py | Updated to use specific nats.js.errors.NotFoundError instead of generic Exception |
| ami/jobs/management/commands/chaos_monkey.py | New management command for Redis and NATS fault injection testing |
| ami/jobs/management/commands/test_ml_job_e2e.py | Updated help text to reference monitoring documentation |
| docs/claude/reference/monitoring-async-jobs.md | New comprehensive monitoring guide for async jobs |
| ui/src/data-services/models/job.ts | Added CANCELING to non-retryable states |
| .agents/AGENTS.md | Added reference to E2E testing and monitoring documentation |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
ui/src/data-services/models/job.ts (1)
64-71:⚠️ Potential issue | 🟠 MajorBlock retry while a job is already in
RETRY.
canRetrynow blocksPENDING/CANCELING, but it still allows retries when status isRETRY(an active processing state), which can permit duplicate retry actions.Proposed fix
get canRetry(): boolean { return ( this._job.user_permissions.includes(UserPermission.Run) && this.status.code !== 'CREATED' && this.status.code !== 'STARTED' && this.status.code !== 'PENDING' && - this.status.code !== 'CANCELING' + this.status.code !== 'CANCELING' && + this.status.code !== 'RETRY' ) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ui/src/data-services/models/job.ts` around lines 64 - 71, The canRetry getter currently allows retry when a job's status.code === 'RETRY', which permits duplicate retry actions; update the boolean conditions in the canRetry method (getter) to also return false when this.status.code === 'RETRY' by adding that check alongside the existing PENDING/CANCELING/CREATED/STARTED checks so retries are blocked while a job is already in RETRY.ami/jobs/models.py (1)
339-360:⚠️ Potential issue | 🟠 MajorProtect the full DB log-write sequence from exceptions.
Line 343 executes
refresh_from_dboutside the existing try/catch. If that DB call fails, logging can raise into caller code paths and disrupt job execution.Suggested fix
- self.job.refresh_from_db(fields=["logs"]) - timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - msg = f"[{timestamp}] {record.levelname} {self.format(record)}" - if msg not in self.job.logs.stdout: - self.job.logs.stdout.insert(0, msg) - - # Write a simpler copy of any errors to the errors field - if record.levelno >= logging.ERROR: - if record.message not in self.job.logs.stderr: - self.job.logs.stderr.insert(0, record.message) - - if len(self.job.logs.stdout) > self.max_log_length: - self.job.logs.stdout = self.job.logs.stdout[: self.max_log_length] - - # `@TODO` consider saving logs to the database periodically rather than on every log try: + self.job.refresh_from_db(fields=["logs"]) + timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + msg = f"[{timestamp}] {record.levelname} {self.format(record)}" + if msg not in self.job.logs.stdout: + self.job.logs.stdout.insert(0, msg) + + if record.levelno >= logging.ERROR: + if record.message not in self.job.logs.stderr: + self.job.logs.stderr.insert(0, record.message) + + if len(self.job.logs.stdout) > self.max_log_length: + self.job.logs.stdout = self.job.logs.stdout[: self.max_log_length] + self.job.save(update_fields=["logs"], update_progress=False) except Exception as e: logger.error(f"Failed to save logs for job #{self.job.pk}: {e}")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/models.py` around lines 339 - 360, The refresh_from_db call can raise and currently sits outside the try/except around self.job.save, allowing logging to crash callers; wrap the entire DB-write sequence (including self.job.refresh_from_db(fields=["logs"]), the stdout/stderr modifications, trimming, and the self.job.save(update_fields=["logs"], update_progress=False) call) in a try/except that catches Exception (as the existing save handler does) so refresh failures are swallowed/logged locally and do not propagate; retain existing behavior of not re-raising and ensure any logging of the exception uses the same local error handling path as the save exception handling.
🧹 Nitpick comments (3)
ami/jobs/management/commands/chaos_monkey.py (3)
78-81: Add timeout to individual stream deletions for consistency.The existing
delete_streammethod innats_queue.pywraps the deletion call withasyncio.wait_for(..., timeout=NATS_JETSTREAM_TIMEOUT). Without a timeout here, this loop could hang indefinitely if NATS becomes unresponsive mid-deletion—which is ironic for a chaos testing tool.⏱️ Proposed fix: Add timeout to each deletion
+import asyncio + +NATS_JETSTREAM_TIMEOUT = 5 # or import from nats_queue if accessible + async def _delete_all_streams(): import nats nc = await nats.connect(NATS_URL, connect_timeout=5, allow_reconnect=False) js = nc.jetstream() try: streams = await js.streams_info() if not streams: return [] deleted = [] for stream in streams: name = stream.config.name - await js.delete_stream(name) + await asyncio.wait_for(js.delete_stream(name), timeout=NATS_JETSTREAM_TIMEOUT) deleted.append(name) return deleted finally: await nc.close()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/management/commands/chaos_monkey.py` around lines 78 - 81, The loop in chaos_monkey.py calls js.delete_stream(name) without a timeout, so wrap each individual deletion in asyncio.wait_for(..., timeout=NATS_JETSTREAM_TIMEOUT) to match the timeout behavior in nats_queue.py; update the for loop that calls js.delete_stream to await asyncio.wait_for(js.delete_stream(name), timeout=NATS_JETSTREAM_TIMEOUT) and ensure asyncio and the NATS_JETSTREAM_TIMEOUT constant are imported/available in the file so individual stream deletions cannot hang indefinitely.
38-45: Consider adding an else clause for defensive coding.While argparse constraints currently prevent invalid combinations, adding new actions or services could result in silent no-ops if
handle()isn't updated accordingly.🛡️ Optional: Add defensive else clause
if action == "flush" and service == "redis": self._flush_redis() elif action == "flush" and service == "nats": self._flush_nats() + else: + raise CommandError(f"Unsupported action/service combination: {action} {service}")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/management/commands/chaos_monkey.py` around lines 38 - 45, The handle method currently only handles the two valid combos and silently does nothing for others; add a defensive else branch at the end of handle() that detects unsupported action/service combinations (when neither the "flush redis" nor "flush nats" branches match) and fails fast by raising a django.core.management.base.CommandError or logging a clear error message; update the handle function (referencing handle, _flush_redis, and _flush_nats) to validate inputs and raise the CommandError with a message like "Unsupported action/service combination: {action}/{service}" so future changes don't accidentally create silent no-ops.
20-20: Consider reading NATS URL from settings instead of hardcoding.Hardcoding
nats://ami_local_nats:4222limits this command to a specific environment. If the project already has NATS configuration in Django settings (similar to how Redis usesget_redis_connection("default")), consider reusing it for consistency.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/management/commands/chaos_monkey.py` at line 20, Replace the hardcoded NATS_URL in chaos_monkey.py with a value read from Django settings: import django.conf.settings and use the project's NATS configuration (e.g., settings.NATS_URL or settings.NATS.get('URL') — whichever convention your project uses), falling back to "nats://ami_local_nats:4222" if the setting is missing; update the NATS_URL symbol assignment accordingly so the command respects environment-specific configuration.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/jobs/models.py`:
- Around line 984-989: Move the Celery revoke call to run before running
cleanup_async_job_if_needed to prevent a worker from recreating resources after
cleanup; specifically, obtain the AsyncResult via
run_job.AsyncResult(self.task_id), call task.revoke(terminate=True) (and any
needed task state checks) and self.save() immediately, then call
cleanup_async_job_if_needed(self). Update the logic in the method that currently
calls cleanup_async_job_if_needed(self) first so revoke + save happen first to
eliminate the cancellation race window.
In `@ami/jobs/tasks.py`:
- Around line 190-193: The _fail_job function currently sets job.status =
JobState.FAILURE and saves only ["status","finished_at"], which can leave
job.progress.summary.status out of sync; update job.progress.summary.status =
JobState.FAILURE (or the equivalent enum/value used elsewhere) before saving and
include the progress field in the save call (e.g., add "progress" to
update_fields) so both job.status and progress.summary.status are persisted
together and remain consistent with JobState.FAILURE.
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 98-108: The _stream_exists method calls
self.js.stream_info(stream_name) without timeout protection; wrap that call in
asyncio.wait_for using the same timeout variable used elsewhere (e.g.,
self._nats_timeout) so the JetStream metadata call cannot hang indefinitely,
keeping the existing try/except NotFoundError behavior and returning True/False
as before.
In `@docs/claude/reference/monitoring-async-jobs.md`:
- Around line 167-173: The fenced ASCII lifecycle block (the triple-backtick
block showing "CREATED → PENDING → STARTED → [processing] → SUCCESS ...
FAILURE") is missing a language identifier and triggers markdownlint MD040;
update that code fence to include a language tag (e.g., change the opening ```
to ```text) so the block becomes a fenced-code block with a language identifier
while keeping the ASCII diagram content unchanged.
---
Outside diff comments:
In `@ami/jobs/models.py`:
- Around line 339-360: The refresh_from_db call can raise and currently sits
outside the try/except around self.job.save, allowing logging to crash callers;
wrap the entire DB-write sequence (including
self.job.refresh_from_db(fields=["logs"]), the stdout/stderr modifications,
trimming, and the self.job.save(update_fields=["logs"], update_progress=False)
call) in a try/except that catches Exception (as the existing save handler does)
so refresh failures are swallowed/logged locally and do not propagate; retain
existing behavior of not re-raising and ensure any logging of the exception uses
the same local error handling path as the save exception handling.
In `@ui/src/data-services/models/job.ts`:
- Around line 64-71: The canRetry getter currently allows retry when a job's
status.code === 'RETRY', which permits duplicate retry actions; update the
boolean conditions in the canRetry method (getter) to also return false when
this.status.code === 'RETRY' by adding that check alongside the existing
PENDING/CANCELING/CREATED/STARTED checks so retries are blocked while a job is
already in RETRY.
---
Nitpick comments:
In `@ami/jobs/management/commands/chaos_monkey.py`:
- Around line 78-81: The loop in chaos_monkey.py calls js.delete_stream(name)
without a timeout, so wrap each individual deletion in asyncio.wait_for(...,
timeout=NATS_JETSTREAM_TIMEOUT) to match the timeout behavior in nats_queue.py;
update the for loop that calls js.delete_stream to await
asyncio.wait_for(js.delete_stream(name), timeout=NATS_JETSTREAM_TIMEOUT) and
ensure asyncio and the NATS_JETSTREAM_TIMEOUT constant are imported/available in
the file so individual stream deletions cannot hang indefinitely.
- Around line 38-45: The handle method currently only handles the two valid
combos and silently does nothing for others; add a defensive else branch at the
end of handle() that detects unsupported action/service combinations (when
neither the "flush redis" nor "flush nats" branches match) and fails fast by
raising a django.core.management.base.CommandError or logging a clear error
message; update the handle function (referencing handle, _flush_redis, and
_flush_nats) to validate inputs and raise the CommandError with a message like
"Unsupported action/service combination: {action}/{service}" so future changes
don't accidentally create silent no-ops.
- Line 20: Replace the hardcoded NATS_URL in chaos_monkey.py with a value read
from Django settings: import django.conf.settings and use the project's NATS
configuration (e.g., settings.NATS_URL or settings.NATS.get('URL') — whichever
convention your project uses), falling back to "nats://ami_local_nats:4222" if
the setting is missing; update the NATS_URL symbol assignment accordingly so the
command respects environment-specific configuration.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
.agents/AGENTS.mdami/jobs/management/commands/chaos_monkey.pyami/jobs/management/commands/test_ml_job_e2e.pyami/jobs/models.pyami/jobs/tasks.pyami/jobs/tests.pyami/jobs/views.pyami/ml/orchestration/jobs.pyami/ml/orchestration/nats_queue.pyami/ml/orchestration/tests/test_nats_queue.pydocs/claude/reference/monitoring-async-jobs.mdui/src/data-services/models/job.ts
|
Claude says — responding to CodeRabbit nitpick comments on Timeout on stream deletions (L78-81): This is a manual chaos testing tool — if NATS hangs during deletion, the developer will Ctrl+C. Adding Defensive else clause (L38-45): argparse NATS_URL from settings (L20): Already addressed — will update to read from |
|
Claude says — responding to CodeRabbit "outside diff" comments: Protect full DB log-write sequence ( Block retry while in RETRY status ( |
…afety - Reorder cancel(): revoke Celery task before cleaning up async resources to prevent a theoretical race where a worker recreates state after cleanup - Remove redundant self.save() after task.revoke() (no fields changed) - Use update_status() in _fail_job() to keep progress.summary.status in sync with job.status - Wrap entire log handler emit() DB sequence (refresh_from_db + mutations + save) in try/except so a DB failure during logging cannot crash callers Co-Authored-By: Claude <[email protected]>
- Add asyncio.wait_for() wrapper to _stream_exists() stream_info call, accidentally dropped during refactor from _ensure_stream - Read NATS_URL from Django settings in chaos_monkey command instead of hardcoding, consistent with TaskQueueManager Co-Authored-By: Claude <[email protected]>
RETRY is an active processing state; allowing another retry while one is already running could cause duplicate execution. Co-Authored-By: Claude <[email protected]>
Add docstring explaining that TimeoutError is deliberately not caught — an unreachable NATS server should be a hard failure, not a "stream missing" false negative. Multiple reviewers questioned this behavior. Co-Authored-By: Claude <[email protected]>
Co-Authored-By: Claude <[email protected]>
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
ami/ml/orchestration/nats_queue.py (1)
120-123: Consider usinglogger.debuginstead oflogger.warningfor expected stream creation.When
publish_taskis called for a new job, the stream won't exist yet—that's normal operation. Logging atWARNINGlevel here may create noise in production logs for every new job.💡 Suggested change
if not await self._stream_exists(job_id): stream_name = self._get_stream_name(job_id) subject = self._get_subject(job_id) - logger.warning(f"Stream {stream_name} does not exist") + logger.debug(f"Stream {stream_name} does not exist, creating...") # Stream doesn't exist, create it🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/nats_queue.py` around lines 120 - 123, The warning log in publish_task when a stream doesn't exist is expected behavior and should be changed to a lower-severity message; replace logger.warning(...) in the block that calls self._stream_exists(job_id) with logger.debug(...) (optionally include job_id or stream_name via self._get_stream_name(job_id) to keep context), so use the same check around _stream_exists and the same stream/subject values from _get_stream_name/_get_subject but log at debug level instead of warning.ui/src/data-services/models/job.ts (1)
64-72: Consider centralizing retry-blocked statuses to reduce future drift.This logic is correct, but a small refactor (constant set/array for blocked statuses) will make future status additions less error-prone.
♻️ Optional refactor
+const NON_RETRYABLE_STATUSES: ServerJobStatusCode[] = [ + 'CREATED', + 'STARTED', + 'PENDING', + 'CANCELING', + 'RETRY', +] + get canRetry(): boolean { return ( this._job.user_permissions.includes(UserPermission.Run) && - this.status.code !== 'CREATED' && - this.status.code !== 'STARTED' && - this.status.code !== 'PENDING' && - this.status.code !== 'CANCELING' && - this.status.code !== 'RETRY' + !NON_RETRYABLE_STATUSES.includes(this.status.code) ) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ui/src/data-services/models/job.ts` around lines 64 - 72, Refactor the canRetry getter to use a centralized constant (e.g., BLOCKED_RETRY_STATUSES or RETRY_BLOCKLIST) containing the status codes currently checked ('CREATED','STARTED','PENDING','CANCELING','RETRY') and then check membership against that set; update the logic in canRetry (which references this._job.user_permissions.includes(UserPermission.Run) and this.status.code) to use the new constant so future status changes only require editing the blocklist.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/jobs/management/commands/chaos_monkey.py`:
- Around line 65-97: The _flush_nats helper uses nc.jetstream() but calls
JetStreamManager APIs; update the inner async function _delete_all_streams to
use nc.jsm() instead of nc.jetstream() and change all subsequent calls from js.*
to jsm.*, e.g. call jsm.streams_info() and jsm.delete_stream(name) (leave
connection/close logic and the surrounding sync/try/except intact).
---
Nitpick comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 120-123: The warning log in publish_task when a stream doesn't
exist is expected behavior and should be changed to a lower-severity message;
replace logger.warning(...) in the block that calls self._stream_exists(job_id)
with logger.debug(...) (optionally include job_id or stream_name via
self._get_stream_name(job_id) to keep context), so use the same check around
_stream_exists and the same stream/subject values from
_get_stream_name/_get_subject but log at debug level instead of warning.
In `@ui/src/data-services/models/job.ts`:
- Around line 64-72: Refactor the canRetry getter to use a centralized constant
(e.g., BLOCKED_RETRY_STATUSES or RETRY_BLOCKLIST) containing the status codes
currently checked ('CREATED','STARTED','PENDING','CANCELING','RETRY') and then
check membership against that set; update the logic in canRetry (which
references this._job.user_permissions.includes(UserPermission.Run) and
this.status.code) to use the new constant so future status changes only require
editing the blocklist.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
ami/jobs/management/commands/chaos_monkey.pyami/jobs/models.pyami/jobs/tasks.pyami/ml/orchestration/nats_queue.pydocs/claude/reference/monitoring-async-jobs.mdui/src/data-services/models/job.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- docs/claude/reference/monitoring-async-jobs.md
|
Claude says — responding to CodeRabbit's second review round:
Centralize retry-blocked statuses (Nitpick): Over-engineering for 5 status checks in a getter. The inline list is clear and easy to read. Adding a separate constant adds indirection without meaningful benefit. Skipping. |
|
Claude says: while troubleshooting a stalled job with @mihow we discovered a failure mode that isn't covered by the fixes in this PR. What happened: Job 2318 (async_api, 925 images) processed 168 images successfully but the remaining 757 were never completed. The NATS consumer shows all 925 messages were delivered (4381 total delivery attempts, 756 redelivered) and the consumer is now idle with The gap: When an external worker pulls tasks via Possible fix: A stale consumer check — either as a periodic beat task or inside the |

Summary
This pull request builds on #1150 and it's based off
carlos/redisatomicThis pull request introduces a new chaos testing management command for fault injection and refactors async job cleanup logic to improve reliability and resilience. The most important changes include the addition of a manual chaos testing utility, improved job log handling to prevent lost logs, and a more robust cleanup of async resources for jobs using Redis and NATS. The cleanup logic is now more consistent and reliable, especially in failure and cancellation scenarios.
chaos_monkey.pymanagement command for manual fault injection of Redis and NATS, allowing developers to flush or pause these services to simulate outages and test job resilience.cleanup_async_job_resourcesto accept job ID and logger instead of a Job instance, ensuring cleanup can occur even if the Job object is unavailable and improving logging consistency._fail_jobhelper to mark jobs as failed and trigger async resource cleanup when Redis state is missing, improving failure handling in NATS pipeline results.REVOKEDfor async jobs._stream_existscheck in NATS queue orchestration to avoid unnecessary stream creation and improve error handling when reserving tasks.How to Test the Changes
Start a job with e.g.:
Then either cancel it in the UI or flush/stop Redis
Screenshots
Known Issues
Occasionally the Error logs get overwritten by another worker and hence the error won't be displayed, which is a known issue with the job logger.
Deployment Notes
Include instructions if this PR requires specific steps for its deployment (database migrations, config changes, etc.)
Checklist
Summary by CodeRabbit
New Features
Bug Fixes
Documentation